The members of the Analytics Services team here at Activision are heavy users of Mesos and Marathon to deploy and manage services on our clusters. We are also huge fans of Python and the Jupyter project.
The Jupyter project was recently reorganized from IPython, in a move referred to as "the split": One part that was originally part of IPython (IPython.parallel
) was split off into a separate project ipyparallel. This powerful component of the IPython ecosystem is generally overlooked.
In this post I will give a quick introduction to the ipyparallel project and then introduce a new launcher we have open sourced to deploy IPython clusters into Mesos clusters. While we have published this notebook in HTML, please feel free to download the original to follow along.
The ipyparallel project is the new home of IPython.parallel module that was hosted within IPython core before 2015. The focus of the project is interactive cluster computing. This focus on interactive computing and first-class integration with the IPython project is a distinguishing feature. For a more complete dive into the internals of ipyparallel, please visit the docs. I aim to give the bare minimum to get you started.
At the most basic level an IPython cluster is a set of Python interpreters that can be accessed over TCP. Under the hood, it works similarly to how Jupyter/IPython work today. When you open a new notebook in the browser, a Python process (called a kernel) will be started to run the code you submit. ipyparallel does the same thing except instead of a single Python kernel, you can start many distributed kernels over many machines.
There are three main components to the stack.
The easiest way to get your hands dirty is to spin up a cluster locally. That is you will run a Client, Controller, and Engines all on your local machine. The hardest part of provisioning distributed clusters is making sure all the pieces can talk to each other (as usual the easiest solution to a distributed problem is to make it local).
Our team are users of conda to help manage our computational environments (Python and beyond). Here is a quick run through to get setup (our public conda recipes are here). A combination of pip and virtualenv will also work, but when you start installing packages from the scipy stack we find conda the easiest to use.
First find your version of Miniconda from here
If you're using linux these commands will work:
wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh
bash Miniconda-latest-Linux-x86_64.sh # follow prompts
conda update --all
# make a new python 3 env named py3
conda create -n py3 python=3 ipython ipyparallel ipython-notebook
source activate py3
While there are lower level commands to start and configure Controllers and Engines, the primary command you will use is ipcluster
. This is a helpful utility to start all the components and configure your local client. By default, it uses the LocalControllerLauncher
and the LocalEngineSetLauncher
which is exactly what we want to start.
Open a terminal install ipyparallel
and start a cluster.
(py3)➜ ipcluster start --n=4
2016-04-11 22:24:15.514 [IPClusterStart] Starting ipcluster with [daemon=False]
2016-04-11 22:24:15.515 [IPClusterStart] Creating pid file: /home/vagrant/.ipython/profile_default/pid/ipcluster.pid
2016-04-11 22:24:15.515 [IPClusterStart] Starting Controller with LocalControllerLauncher
2016-04-11 22:24:16.519 [IPClusterStart] Starting 2 Engines with LocalEngineSetLauncher
2016-04-11 22:24:46.633 [IPClusterStart] Engines appear to have started successfully
In [29]:
# You can also use the IPython magic shell command. but errors are harder to see and stopping the cluster can be janky.
!ipcluster start -n 4 --daemon
If started correctly we should now have four engines running on our local machine. Now to actually interact with them. First we need to import the client.
In [30]:
import ipyparallel as ipp
rc = ipp.Client()
In [31]:
rc.ids # list the ids of the engine the client can communicate with
Out[31]:
The client has two primary way to farm out work to the engines. First is a direct view. This is used to apply the same work to all engines. To create a DirectView
just slice the client.
The second way is a LoadBalancedView
which we will cover later in the post.
In [32]:
dv = rc[:]
dv
Out[32]:
With a direct view you can issue a function to execute within the context of that engine's Python process.
In [33]:
def get_engine_pid():
import os
return os.getpid()
dv.apply_sync(get_engine_pid)
Out[33]:
This pattern is so common that ipyparallel provides a IPython magic function to execute a code cell to all engines: %%px
In [34]:
%%px
import os
os.getpid()
It is key to notice that the engines are fully running stateful Python interpreters. If you set a varible within %%px
code block, it will remain there.
In [35]:
%%px
foo = 'bar on pid {}'.format(os.getpid())
In [36]:
%%px
foo
The DirectView
object provides some syntactic sugar to help distributing data to each engine. First is dictionary style retrieval and assignment. First let's retrieve the value of foo
from each engine.
In [37]:
dv['foo']
Out[37]:
Now we can overwrite it's its value.
In [38]:
dv['foo'] = 'bar'
dv['foo']
Out[38]:
There are many cases where you don't want the same data on each machine, but rather you want to chuck an list and distribute each chunk to an engine. The DirectView
provides the .scatter
and the .gather
methods for this.
In [39]:
# start with a list of ids to work on
user_ids = list(range(1000))
dv.scatter('user_id_chunk', user_ids)
Out[39]:
Notice that this method completed almost immediately and returned an AsyncResult
. All the methods we have used up to now have be blocking and synchronous. The scatter
method is aysnc. To turn this scatter into a blocking call we can chain a .get()
to the call.
In [40]:
dv.scatter('user_id_chunk', user_ids).get()
Out[40]:
Now we have a variable on each engine that holds an equal amount of the original list.
In [41]:
%%px
print("Len", len(user_id_chunk))
print("Max", max(user_id_chunk))
Let's apply a simple function to each list. First, declare a function within each engine. The --local
flag also executes the code block in your local client. This is very useful to help debug your code.
In [42]:
%%px --local
def the_most_interesting_transformation_ever(user_id):
"""
This function is really interesting
"""
return "ID:{}".format(user_id * 3)
In [43]:
the_most_interesting_transformation_ever(1)
Out[43]:
In [44]:
%%px
transformed_user_ids = list(map(the_most_interesting_transformation_ever, user_id_chunk))
Now we have 4 separate list of transformed ids. We want to stitch the disparate lists into one list on our local notebook. gather
is used for that.
In [45]:
all_transformed_user_ids = dv.gather('transformed_user_ids').get()
In [46]:
print(len(all_transformed_user_ids))
print(all_transformed_user_ids[0:10])
Obviously, this example is contrived. The serialization cost of shipping Python objects over the wire to each engine is more expensive than the calculation we performed. This tradeoff between serialization/transport vs computation cost is central to any decision to use distributed processing. However, there are many highly parallelizable problems where this project can be extremely useful. Some of the main use cases we use ipyparallel for are hyperparameter searches and bulk loading/writing from storage systems.
The previous example where you scatter a list, perform a calculation, and then gather a result works for lots of problems. One issue with this approach is that each engine does an identical amount of work. If the complexity of the process each engine is performing is variable, this naive scheduling approach can waste processing power and time. Take for example this function:
In [47]:
%%px --local
import random
import time
def fake_external_io(url):
# Simulate variable complexity/latency
time.sleep(random.random())
return "HTML for URL: {}".format(url)
In [48]:
%time fake_external_io(1)
Out[48]:
In [49]:
%time fake_external_io(1)
Out[49]:
If you had a list of urls to scrape and gave each worker an equal share, some workers would finish early and have to sit around doing nothing. A better approach is to assign work to each engine as it finishes. This way the work will be load balanced over the cluster and you will complete your process earlier. ipyparallel provides the LoadBalancedView
for this exact use case. For this specific problem, threading or an async event loop would likely be a better approach to speeding up or scaling out, but suspend your disbelief for this exercise.
In [50]:
lview = rc.load_balanced_view()
lview
Out[50]:
In [51]:
@lview.parallel()
@ipp.require('time', 'random')
def p_fake_external_io(url):
# Simulate variable complexity/latency
time.sleep(random.random())
return "HTML for URL: {}".format(url)
Here we used two ipyparallel decorators. First we used lview.parallel()
to declared this a parallel function. Second, we declared that this function depends on the modules time and random. Now that we have a load balanced function we can compare timings with our naive approach.
In [52]:
urls = ['foo{}.com'.format(i) for i in range(100)]
In [92]:
# Naive single threaded
%time res = list(map(fake_external_io, urls))
In [93]:
dv.scatter('urls', urls).get()
Out[93]:
In [94]:
# seed for some semblance reproducability
%px random.seed(99)
In [95]:
# Naive aassignment
%time %px results = list(map(fake_external_io, urls))
In [96]:
# Load balanced version
%time res = p_fake_external_io.map(urls).get()
This isn't a perfect example, but you can get the idea. The larger the number of inputs to your parallel problem, the more variable the run time of each component process, the more time you save from switching to a load balanced view.
This is only scratching the surface of ipyparallel project. I would highly recommend taking a look at the docs. Here is a list of further topics I would look into if you are interested.
Our team at Activision largely uses ipython clusters for distributed model training. This project has been vital for hyperparameter searches for our machine learning models, allowing us to easily parallelize these searches beyond one machine has sped up training by many orders of magnitude utilizing hundreds of cores.
The examples so far are a useful introduction to the API and the some features of ipyparallel. Hopefully you are convinced to try out the library. However, deploying a working cluster beyond a single machine introduces some issues.
ipyparallel provides support for a range of cluster and batch job management systems such as PBS and WindowsHPC. The full list is provided in the documentation. ipyparallel also provides an SSH based launcher. Given passwordless ssh onto machine you can easily deploy engines and connect them to your controller and client. Also there is a wonderful project starcluster that helps spin up machines from cloud providers.
These tools are great. If you have access to existing HPC clusters or are planning on deploying dedicated clusters either on your own cold-iron (2016 version of bare-metal) or in the cloud then they meet your needs.
However, we are big users of Mesos, Docker, and Marathon to manage our clusters and services. Furthermore, even with the existing launchers, managing complex dependencies within the engines is a pain. Using Docker to package all dependencies makes deploying heterogeneous clusters easier. Targeting our existing cluster management system and simplifying dependencies is a big win for us.
With this in mind, we are open sourcing a new ipyparallel launcher that deploys IPython clusters into Mesos using Docker and Marathon. The code lives here and on pypi/conda as ipyparallel_mesos
.
We have two pre-built Docker images for the Controller and Engine. These are stripped down Docker images. Internally we use conda for almost all our dependencies, even inside our Docker containers. Please visit our public conda recipes and channel. However, extending from the ipyparallel-marathon-engine
image will allow you to easily install your custom dependencies with or without conda.
The project is young, but hopefully you will find it useful. Please note that this currently targets Python 3. PR's are welcome to support older versions of Python (it's 2016, we can now refer to 2.7 as old). Please open any issues on the github page. Please read the README for the project for more details.